use common_utils::{errors::CustomResult, ext_traits::AsyncExt};
#[cfg(feature = "v1")]
use common_utils::{fallback_reverse_lookup_not_found, types::ConnectorTransactionId};
use diesel_models::{
    enums::{
        MandateAmountData as DieselMandateAmountData, MandateDataType as DieselMandateType,
        MandateDetails as DieselMandateDetails, MerchantStorageScheme,
    },
    kv,
    payment_attempt::PaymentAttempt as DieselPaymentAttempt,
    reverse_lookup::{ReverseLookup, ReverseLookupNew},
};
use error_stack::ResultExt;
#[cfg(all(feature = "v1", feature = "olap"))]
use futures::future::{try_join_all, FutureExt};
#[cfg(feature = "v2")]
use hyperswitch_domain_models::behaviour::ReverseConversion;
use hyperswitch_domain_models::{
    behaviour::Conversion,
    mandates::{MandateAmountData, MandateDataType, MandateDetails},
    merchant_key_store::MerchantKeyStore,
    payments::payment_attempt::{PaymentAttempt, PaymentAttemptInterface, PaymentAttemptUpdate},
};
#[cfg(all(feature = "v1", feature = "olap"))]
use hyperswitch_domain_models::{
    payments::payment_attempt::PaymentListFilters, payments::PaymentIntent,
};
#[cfg(feature = "v2")]
use label::*;
use redis_interface::HsetnxReply;
use router_env::{instrument, tracing};

#[cfg(feature = "v2")]
use crate::kv_router_store::{FilterResourceParams, FindResourceBy, UpdateResourceParams};
use crate::{
    diesel_error_to_data_error, errors,
    errors::RedisErrorExt,
    kv_router_store::KVRouterStore,
    lookup::ReverseLookupInterface,
    redis::kv_store::{decide_storage_scheme, kv_wrapper, KvOperation, Op, PartitionKey},
    utils::{pg_connection_read, pg_connection_write, try_redis_get_else_try_database_get},
    DataModelExt, DatabaseStore, RouterStore,
};

#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentAttemptInterface for RouterStore<T> {
    type Error = errors::StorageError;
    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn insert_payment_attempt(
        &self,
        payment_attempt: PaymentAttempt,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_write(self).await?;
        let diesel_payment_attempt = payment_attempt
            .construct_new()
            .await
            .change_context(errors::StorageError::EncryptionError)?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        diesel_payment_attempt
            .insert(&conn)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .async_map(|diesel_payment_attempt| async {
                PaymentAttempt::convert_back(
                    key_manager_state,
                    diesel_payment_attempt,
                    merchant_key_store.key.get_inner(),
                    merchant_key_store.merchant_id.clone().into(),
                )
                .await
                .change_context(errors::StorageError::DecryptionError)
            })
            .await?
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn insert_payment_attempt(
        &self,
        merchant_key_store: &MerchantKeyStore,
        payment_attempt: PaymentAttempt,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_write(self).await?;
        payment_attempt
            .construct_new()
            .await
            .change_context(errors::StorageError::EncryptionError)?
            .insert(&conn)
            .await
            .map_err(|error| {
                let new_error = diesel_error_to_data_error(*error.current_context());
                error.change_context(new_error)
            })?
            .convert(
                self.get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn update_payment_attempt_with_attempt_id(
        &self,
        this: PaymentAttempt,
        payment_attempt: PaymentAttemptUpdate,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_write(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        this.convert()
            .await
            .change_context(errors::StorageError::EncryptionError)?
            .update_with_attempt_id(&conn, payment_attempt.to_storage_model())
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .async_map(|diesel_payment_attempt| async {
                PaymentAttempt::convert_back(
                    key_manager_state,
                    diesel_payment_attempt,
                    merchant_key_store.key.get_inner(),
                    merchant_key_store.merchant_id.clone().into(),
                )
                .await
                .change_context(errors::StorageError::DecryptionError)
            })
            .await?
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn update_payment_attempt(
        &self,
        merchant_key_store: &MerchantKeyStore,
        this: PaymentAttempt,
        payment_attempt: PaymentAttemptUpdate,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_write(self).await?;

        Conversion::convert(this)
            .await
            .change_context(errors::StorageError::EncryptionError)?
            .update_with_attempt_id(
                &conn,
                diesel_models::PaymentAttemptUpdateInternal::from(payment_attempt),
            )
            .await
            .map_err(|error| {
                let new_error = diesel_error_to_data_error(*error.current_context());
                error.change_context(new_error)
            })?
            .convert(
                self.get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
        &self,
        connector_transaction_id: &ConnectorTransactionId,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        DieselPaymentAttempt::find_by_connector_transaction_id_payment_id_merchant_id(
            &conn,
            connector_transaction_id,
            payment_id,
            merchant_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        })
        .await?
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        DieselPaymentAttempt::find_last_successful_attempt_by_payment_id_merchant_id(
            &conn,
            payment_id,
            merchant_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        })
        .await?
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id_merchant_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        DieselPaymentAttempt::find_last_successful_or_partially_captured_attempt_by_payment_id_merchant_id(
            &conn,
            payment_id,
            merchant_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        }).await?
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        payment_id: &common_utils::id_type::GlobalPaymentId,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        DieselPaymentAttempt::find_last_successful_or_partially_captured_attempt_by_payment_id(
            &conn, payment_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })?
        .convert(
            self.get_keymanager_state()
                .attach_printable("Missing KeyManagerState")?,
            merchant_key_store.key.get_inner(),
            merchant_key_store.merchant_id.clone().into(),
        )
        .await
        .change_context(errors::StorageError::DecryptionError)
    }

    #[instrument(skip_all)]
    #[cfg(feature = "v1")]
    async fn find_payment_attempt_by_merchant_id_connector_txn_id(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        connector_txn_id: &str,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        DieselPaymentAttempt::find_by_merchant_id_connector_txn_id(
            &conn,
            merchant_id,
            connector_txn_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        })
        .await?
    }

    #[instrument(skip_all)]
    #[cfg(feature = "v2")]
    async fn find_payment_attempt_by_profile_id_connector_transaction_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        profile_id: &common_utils::id_type::ProfileId,
        connector_txn_id: &str,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        DieselPaymentAttempt::find_by_profile_id_connector_transaction_id(
            &conn,
            profile_id,
            connector_txn_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })?
        .convert(
            self.get_keymanager_state()
                .attach_printable("Missing KeyManagerState")?,
            merchant_key_store.key.get_inner(),
            merchant_key_store.merchant_id.clone().into(),
        )
        .await
        .change_context(errors::StorageError::DecryptionError)
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_payment_id_merchant_id_attempt_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        attempt_id: &str,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;

        DieselPaymentAttempt::find_by_payment_id_merchant_id_attempt_id(
            &conn,
            payment_id,
            merchant_id,
            attempt_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        })
        .await?
    }

    #[cfg(all(feature = "v1", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_filters_for_payments(
        &self,
        pi: &[PaymentIntent],
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentListFilters, errors::StorageError> {
        use hyperswitch_domain_models::behaviour::Conversion;

        let conn = pg_connection_read(self).await?;
        let intents = try_join_all(pi.iter().cloned().map(|pi| async {
            Conversion::convert(pi)
                .await
                .change_context(errors::StorageError::EncryptionError)
        }))
        .await?;

        DieselPaymentAttempt::get_filters_for_payments(&conn, intents.as_slice(), merchant_id)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .map(
                |(
                    connector,
                    currency,
                    status,
                    payment_method,
                    payment_method_type,
                    authentication_type,
                )| PaymentListFilters {
                    connector,
                    currency,
                    status,
                    payment_method,
                    payment_method_type,
                    authentication_type,
                },
            )
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_preprocessing_id_merchant_id(
        &self,
        preprocessing_id: &str,
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;

        DieselPaymentAttempt::find_by_merchant_id_preprocessing_id(
            &conn,
            merchant_id,
            preprocessing_id,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
        .async_map(|diesel_payment_attempt| async {
            PaymentAttempt::convert_back(
                key_manager_state,
                diesel_payment_attempt,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
        })
        .await?
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_attempts_by_merchant_id_payment_id(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        payment_id: &common_utils::id_type::PaymentId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<Vec<PaymentAttempt>, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;
        DieselPaymentAttempt::find_by_merchant_id_payment_id(&conn, merchant_id, payment_id)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .map(|v| {
                try_join_all(v.into_iter().map(|diesel_payment_attempt| {
                    PaymentAttempt::convert_back(
                        key_manager_state,
                        diesel_payment_attempt,
                        merchant_key_store.key.get_inner(),
                        merchant_key_store.merchant_id.clone().into(),
                    )
                }))
                .map(|join_result| {
                    join_result.change_context(errors::StorageError::DecryptionError)
                })
            })?
            .await
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_attempt_id_merchant_id(
        &self,
        attempt_id: &str,
        merchant_id: &common_utils::id_type::MerchantId,
        _storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        let key_manager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;

        DieselPaymentAttempt::find_by_merchant_id_attempt_id(&conn, merchant_id, attempt_id)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .async_map(|diesel_payment_attempt| async {
                PaymentAttempt::convert_back(
                    key_manager_state,
                    diesel_payment_attempt,
                    merchant_key_store.key.get_inner(),
                    merchant_key_store.merchant_id.clone().into(),
                )
                .await
                .change_context(errors::StorageError::DecryptionError)
            })
            .await?
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        attempt_id: &common_utils::id_type::GlobalAttemptId,
        _storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;

        DieselPaymentAttempt::find_by_id(&conn, attempt_id)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })?
            .convert(
                self.get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?,
                merchant_key_store.key.get_inner(),
                merchant_key_store.merchant_id.clone().into(),
            )
            .await
            .change_context(errors::StorageError::DecryptionError)
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempts_by_payment_intent_id(
        &self,
        payment_id: &common_utils::id_type::GlobalPaymentId,
        merchant_key_store: &MerchantKeyStore,
        _storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<Vec<PaymentAttempt>, errors::StorageError> {
        use common_utils::ext_traits::AsyncExt;

        let conn = pg_connection_read(self).await?;
        DieselPaymentAttempt::find_by_payment_id(&conn, payment_id)
            .await
            .map_err(|er| {
                let new_err = diesel_error_to_data_error(*er.current_context());
                er.change_context(new_err)
            })
            .async_and_then(|payment_attempts| async {
                let mut domain_payment_attempts = Vec::with_capacity(payment_attempts.len());
                for attempt in payment_attempts.into_iter() {
                    domain_payment_attempts.push(
                        attempt
                            .convert(
                                self.get_keymanager_state()
                                    .attach_printable("Missing KeyManagerState")?,
                                merchant_key_store.key.get_inner(),
                                merchant_key_store.merchant_id.clone().into(),
                            )
                            .await
                            .change_context(errors::StorageError::DecryptionError)?,
                    );
                }
                Ok(domain_payment_attempts)
            })
            .await
    }

    #[cfg(all(feature = "v1", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_total_count_of_filtered_payment_attempts(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        active_attempt_ids: &[String],
        connector: Option<Vec<api_models::enums::Connector>>,
        payment_method: Option<Vec<common_enums::PaymentMethod>>,
        payment_method_type: Option<Vec<common_enums::PaymentMethodType>>,
        authentication_type: Option<Vec<common_enums::AuthenticationType>>,
        merchant_connector_id: Option<Vec<common_utils::id_type::MerchantConnectorAccountId>>,
        card_network: Option<Vec<common_enums::CardNetwork>>,
        card_discovery: Option<Vec<common_enums::CardDiscovery>>,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<i64, errors::StorageError> {
        let conn = self
            .db_store
            .get_replica_pool()
            .get()
            .await
            .change_context(errors::StorageError::DatabaseConnectionError)?;
        let connector_strings = connector.as_ref().map(|connector| {
            connector
                .iter()
                .map(|c| c.to_string())
                .collect::<Vec<String>>()
        });
        DieselPaymentAttempt::get_total_count_of_attempts(
            &conn,
            merchant_id,
            active_attempt_ids,
            connector_strings,
            payment_method,
            payment_method_type,
            authentication_type,
            merchant_connector_id,
            card_network,
            card_discovery,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
    }
    #[cfg(all(feature = "v2", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_total_count_of_filtered_payment_attempts(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        active_attempt_ids: &[String],
        connector: Option<Vec<api_models::enums::Connector>>,
        payment_method_type: Option<Vec<common_enums::PaymentMethod>>,
        payment_method_subtype: Option<Vec<common_enums::PaymentMethodType>>,
        authentication_type: Option<Vec<common_enums::AuthenticationType>>,
        merchant_connector_id: Option<Vec<common_utils::id_type::MerchantConnectorAccountId>>,
        card_network: Option<Vec<common_enums::CardNetwork>>,
        _storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<i64, errors::StorageError> {
        let conn = self
            .db_store
            .get_replica_pool()
            .get()
            .await
            .change_context(errors::StorageError::DatabaseConnectionError)?;

        DieselPaymentAttempt::get_total_count_of_attempts(
            &conn,
            merchant_id,
            active_attempt_ids,
            connector
                .as_ref()
                .map(|vals| vals.iter().map(|v| v.to_string()).collect()),
            payment_method_type,
            payment_method_subtype,
            authentication_type,
            merchant_connector_id,
            card_network,
        )
        .await
        .map_err(|er| {
            let new_err = diesel_error_to_data_error(*er.current_context());
            er.change_context(new_err)
        })
    }
}

#[async_trait::async_trait]
impl<T: DatabaseStore> PaymentAttemptInterface for KVRouterStore<T> {
    type Error = errors::StorageError;
    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn insert_payment_attempt(
        &self,
        payment_attempt: PaymentAttempt,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Insert,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .insert_payment_attempt(payment_attempt, storage_scheme, merchant_key_store)
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let merchant_id = payment_attempt.merchant_id.clone();
                let payment_id = payment_attempt.payment_id.clone();
                let key = PartitionKey::MerchantIdPaymentId {
                    merchant_id: &merchant_id,
                    payment_id: &payment_id,
                };
                let key_str = key.to_string();
                let created_attempt = PaymentAttempt {
                    payment_id: payment_attempt.payment_id.clone(),
                    merchant_id: payment_attempt.merchant_id.clone(),
                    attempt_id: payment_attempt.attempt_id.clone(),
                    status: payment_attempt.status,
                    net_amount: payment_attempt.net_amount.clone(),
                    currency: payment_attempt.currency,
                    save_to_locker: payment_attempt.save_to_locker,
                    connector: payment_attempt.connector.clone(),
                    error_message: payment_attempt.error_message.clone(),
                    offer_amount: payment_attempt.offer_amount,
                    payment_method_id: payment_attempt.payment_method_id.clone(),
                    payment_method: payment_attempt.payment_method,
                    connector_transaction_id: None,
                    capture_method: payment_attempt.capture_method,
                    capture_on: payment_attempt.capture_on,
                    confirm: payment_attempt.confirm,
                    authentication_type: payment_attempt.authentication_type,
                    created_at: payment_attempt.created_at,
                    modified_at: payment_attempt.created_at,
                    last_synced: payment_attempt.last_synced,
                    amount_to_capture: payment_attempt.amount_to_capture,
                    cancellation_reason: payment_attempt.cancellation_reason.clone(),
                    mandate_id: payment_attempt.mandate_id.clone(),
                    browser_info: payment_attempt.browser_info.clone(),
                    payment_token: payment_attempt.payment_token.clone(),
                    error_code: payment_attempt.error_code.clone(),
                    connector_metadata: payment_attempt.connector_metadata.clone(),
                    payment_experience: payment_attempt.payment_experience,
                    payment_method_type: payment_attempt.payment_method_type,
                    payment_method_data: payment_attempt.payment_method_data.clone(),
                    business_sub_label: payment_attempt.business_sub_label.clone(),
                    straight_through_algorithm: payment_attempt.straight_through_algorithm.clone(),
                    mandate_details: payment_attempt.mandate_details.clone(),
                    preprocessing_step_id: payment_attempt.preprocessing_step_id.clone(),
                    error_reason: payment_attempt.error_reason.clone(),
                    multiple_capture_count: payment_attempt.multiple_capture_count,
                    connector_response_reference_id: None,
                    charge_id: None,
                    amount_capturable: payment_attempt.amount_capturable,
                    updated_by: storage_scheme.to_string(),
                    authentication_data: payment_attempt.authentication_data.clone(),
                    encoded_data: payment_attempt.encoded_data.clone(),
                    merchant_connector_id: payment_attempt.merchant_connector_id.clone(),
                    unified_code: payment_attempt.unified_code.clone(),
                    unified_message: payment_attempt.unified_message.clone(),
                    external_three_ds_authentication_attempted: payment_attempt
                        .external_three_ds_authentication_attempted,
                    authentication_connector: payment_attempt.authentication_connector.clone(),
                    authentication_id: payment_attempt.authentication_id.clone(),
                    mandate_data: payment_attempt.mandate_data.clone(),
                    payment_method_billing_address_id: payment_attempt
                        .payment_method_billing_address_id
                        .clone(),
                    fingerprint_id: payment_attempt.fingerprint_id.clone(),
                    client_source: payment_attempt.client_source.clone(),
                    client_version: payment_attempt.client_version.clone(),
                    customer_acceptance: payment_attempt.customer_acceptance.clone(),
                    organization_id: payment_attempt.organization_id.clone(),
                    profile_id: payment_attempt.profile_id.clone(),
                    connector_mandate_detail: payment_attempt.connector_mandate_detail.clone(),
                    request_extended_authorization: payment_attempt.request_extended_authorization,
                    extended_authorization_applied: payment_attempt.extended_authorization_applied,
                    extended_authorization_last_applied_at: payment_attempt
                        .extended_authorization_last_applied_at,
                    capture_before: payment_attempt.capture_before,
                    card_discovery: payment_attempt.card_discovery,
                    charges: None,
                    issuer_error_code: None,
                    issuer_error_message: None,
                    processor_merchant_id: payment_attempt.processor_merchant_id.clone(),
                    created_by: payment_attempt.created_by.clone(),
                    setup_future_usage_applied: payment_attempt.setup_future_usage_applied,
                    routing_approach: payment_attempt.routing_approach.clone(),
                    connector_request_reference_id: payment_attempt
                        .connector_request_reference_id
                        .clone(),
                    debit_routing_savings: None,
                    network_transaction_id: payment_attempt.network_transaction_id.clone(),
                    is_overcapture_enabled: None,
                    network_details: payment_attempt.network_details.clone(),
                    is_stored_credential: payment_attempt.is_stored_credential,
                    authorized_amount: payment_attempt.authorized_amount,
                    tokenization: payment_attempt.tokenization,
                    encrypted_payment_method_data: payment_attempt
                        .encrypted_payment_method_data
                        .clone(),
                };

                let field = format!("pa_{}", created_attempt.attempt_id);

                let redis_entry = kv::TypedSql {
                    op: kv::DBOperation::Insert {
                        insertable: Box::new(kv::Insertable::PaymentAttempt(Box::new(
                            payment_attempt
                                .construct_new()
                                .await
                                .change_context(errors::StorageError::EncryptionError)?,
                        ))),
                    },
                };

                //Reverse lookup for attempt_id
                let reverse_lookup = ReverseLookupNew {
                    lookup_id: format!(
                        "pa_{}_{}",
                        created_attempt.merchant_id.get_string_repr(),
                        &created_attempt.attempt_id,
                    ),
                    pk_id: key_str.clone(),
                    sk_id: field.clone(),
                    source: "payment_attempt".to_string(),
                    updated_by: storage_scheme.to_string(),
                };
                self.insert_reverse_lookup(reverse_lookup, storage_scheme)
                    .await?;

                match Box::pin(kv_wrapper::<DieselPaymentAttempt, _, _>(
                    self,
                    KvOperation::HSetNx(
                        &field,
                        &created_attempt
                            .clone()
                            .convert()
                            .await
                            .change_context(errors::StorageError::EncryptionError)?,
                        redis_entry,
                    ),
                    key,
                ))
                .await
                .map_err(|err| err.to_redis_failed_response(&key_str))?
                .try_into_hsetnx()
                {
                    Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
                        entity: "payment attempt",
                        key: Some(key_str),
                    }
                    .into()),
                    Ok(HsetnxReply::KeySet) => Ok(created_attempt),
                    Err(error) => Err(error.change_context(errors::StorageError::KVError)),
                }
            }
        }
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn insert_payment_attempt(
        &self,
        merchant_key_store: &MerchantKeyStore,
        payment_attempt: PaymentAttempt,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Insert,
        ))
        .await;

        match decided_storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .insert_payment_attempt(
                        merchant_key_store,
                        payment_attempt,
                        decided_storage_scheme,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::GlobalPaymentId {
                    id: &payment_attempt.payment_id,
                };
                let key_str = key.to_string();
                let field = format!(
                    "{}_{}",
                    label::CLUSTER_LABEL,
                    payment_attempt.id.get_string_repr()
                );

                let diesel_payment_attempt_new = payment_attempt
                    .clone()
                    .construct_new()
                    .await
                    .change_context(errors::StorageError::EncryptionError)?;

                let diesel_payment_attempt_for_redis: DieselPaymentAttempt =
                    Conversion::convert(payment_attempt.clone())
                        .await
                        .change_context(errors::StorageError::EncryptionError)?;

                let redis_entry = kv::TypedSql {
                    op: kv::DBOperation::Insert {
                        insertable: Box::new(kv::Insertable::PaymentAttempt(Box::new(
                            diesel_payment_attempt_new.clone(),
                        ))),
                    },
                };

                let reverse_lookup_attempt_id = ReverseLookupNew {
                    lookup_id: label::get_global_id_label(&payment_attempt.id),
                    pk_id: key_str.clone(),
                    sk_id: field.clone(),
                    source: "payment_attempt".to_string(),
                    updated_by: decided_storage_scheme.to_string(),
                };
                self.insert_reverse_lookup(reverse_lookup_attempt_id, decided_storage_scheme)
                    .await?;

                if let Some(ref conn_txn_id_val) = payment_attempt.connector_payment_id {
                    let reverse_lookup_conn_txn_id = ReverseLookupNew {
                        lookup_id: label::get_profile_id_connector_transaction_label(
                            payment_attempt.profile_id.get_string_repr(),
                            conn_txn_id_val,
                        ),
                        pk_id: key_str.clone(),
                        sk_id: field.clone(),
                        source: "payment_attempt".to_string(),
                        updated_by: decided_storage_scheme.to_string(),
                    };
                    self.insert_reverse_lookup(reverse_lookup_conn_txn_id, decided_storage_scheme)
                        .await?;
                }

                match Box::pin(kv_wrapper::<DieselPaymentAttempt, _, _>(
                    self,
                    KvOperation::HSetNx(&field, &diesel_payment_attempt_for_redis, redis_entry),
                    key,
                ))
                .await
                .map_err(|err| err.to_redis_failed_response(&key_str))?
                .try_into_hsetnx()
                {
                    Ok(HsetnxReply::KeyNotSet) => Err(errors::StorageError::DuplicateValue {
                        entity: "payment_attempt",
                        key: Some(payment_attempt.id.get_string_repr().to_owned()),
                    }
                    .into()),
                    Ok(HsetnxReply::KeySet) => Ok(payment_attempt),
                    Err(error) => Err(error.change_context(errors::StorageError::KVError)),
                }
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn update_payment_attempt_with_attempt_id(
        &self,
        this: PaymentAttempt,
        payment_attempt: PaymentAttemptUpdate,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let key = PartitionKey::MerchantIdPaymentId {
            merchant_id: &this.merchant_id,
            payment_id: &this.payment_id,
        };
        let field = format!("pa_{}", this.attempt_id);
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Update(key.clone(), &field, Some(&this.updated_by)),
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .update_payment_attempt_with_attempt_id(
                        this,
                        payment_attempt,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let key_str = key.to_string();
                let old_connector_transaction_id = &this.get_connector_payment_id();
                let old_preprocessing_id = &this.preprocessing_step_id;
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;
                let updated_attempt = PaymentAttempt::convert_back(
                    key_manager_state,
                    payment_attempt.clone().to_storage_model().apply_changeset(
                        this.clone()
                            .convert()
                            .await
                            .change_context(errors::StorageError::EncryptionError)?,
                    ),
                    merchant_key_store.key.get_inner(),
                    merchant_key_store.merchant_id.clone().into(),
                )
                .await
                .change_context(errors::StorageError::EncryptionError)
                .attach_printable("Error while constructing domain model")?;
                // Check for database presence as well Maybe use a read replica here ?
                let redis_value = serde_json::to_string(&updated_attempt)
                    .change_context(errors::StorageError::KVError)?;

                let redis_entry = kv::TypedSql {
                    op: kv::DBOperation::Update {
                        updatable: Box::new(kv::Updateable::PaymentAttemptUpdate(Box::new(
                            kv::PaymentAttemptUpdateMems {
                                orig: this
                                    .clone()
                                    .convert()
                                    .await
                                    .change_context(errors::StorageError::EncryptionError)?,
                                update_data: payment_attempt.to_storage_model(),
                            },
                        ))),
                    },
                };

                match (
                    old_connector_transaction_id,
                    &updated_attempt.get_connector_payment_id(),
                ) {
                    (None, Some(connector_transaction_id)) => {
                        add_connector_txn_id_to_reverse_lookup(
                            self,
                            key_str.as_str(),
                            &this.merchant_id,
                            updated_attempt.attempt_id.as_str(),
                            connector_transaction_id,
                            storage_scheme,
                        )
                        .await?;
                    }
                    (Some(old_connector_transaction_id), Some(connector_transaction_id)) => {
                        if old_connector_transaction_id.ne(connector_transaction_id) {
                            add_connector_txn_id_to_reverse_lookup(
                                self,
                                key_str.as_str(),
                                &this.merchant_id,
                                updated_attempt.attempt_id.as_str(),
                                connector_transaction_id,
                                storage_scheme,
                            )
                            .await?;
                        }
                    }
                    (_, _) => {}
                }

                match (old_preprocessing_id, &updated_attempt.preprocessing_step_id) {
                    (None, Some(preprocessing_id)) => {
                        add_preprocessing_id_to_reverse_lookup(
                            self,
                            key_str.as_str(),
                            &this.merchant_id,
                            updated_attempt.attempt_id.as_str(),
                            preprocessing_id.as_str(),
                            storage_scheme,
                        )
                        .await?;
                    }
                    (Some(old_preprocessing_id), Some(preprocessing_id)) => {
                        if old_preprocessing_id.ne(preprocessing_id) {
                            add_preprocessing_id_to_reverse_lookup(
                                self,
                                key_str.as_str(),
                                &this.merchant_id,
                                updated_attempt.attempt_id.as_str(),
                                preprocessing_id.as_str(),
                                storage_scheme,
                            )
                            .await?;
                        }
                    }
                    (_, _) => {}
                }

                Box::pin(kv_wrapper::<(), _, _>(
                    self,
                    KvOperation::Hset::<DieselPaymentAttempt>((&field, redis_value), redis_entry),
                    key,
                ))
                .await
                .change_context(errors::StorageError::KVError)?
                .try_into_hset()
                .change_context(errors::StorageError::KVError)?;

                Ok(updated_attempt)
            }
        }
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn update_payment_attempt(
        &self,
        merchant_key_store: &MerchantKeyStore,
        this: PaymentAttempt,
        payment_attempt_update: PaymentAttemptUpdate,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let payment_attempt = Conversion::convert(this.clone())
            .await
            .change_context(errors::StorageError::DecryptionError)?;

        let key = PartitionKey::GlobalPaymentId {
            id: &this.payment_id,
        };

        let field = format!("{}_{}", label::CLUSTER_LABEL, this.id.get_string_repr());
        let conn = pg_connection_write(self).await?;

        let payment_attempt_internal =
            diesel_models::PaymentAttemptUpdateInternal::from(payment_attempt_update);
        let updated_payment_attempt = payment_attempt_internal
            .clone()
            .apply_changeset(payment_attempt.clone());

        let updated_by = updated_payment_attempt.updated_by.to_owned();
        let updated_payment_attempt_with_id = payment_attempt
            .clone()
            .update_with_attempt_id(&conn, payment_attempt_internal.clone());

        Box::pin(self.update_resource(
            merchant_key_store,
            storage_scheme,
            updated_payment_attempt_with_id,
            updated_payment_attempt,
            UpdateResourceParams {
                updateable: kv::Updateable::PaymentAttemptUpdate(Box::new(
                    kv::PaymentAttemptUpdateMems {
                        orig: payment_attempt,
                        update_data: payment_attempt_internal,
                    },
                )),
                operation: Op::Update(key.clone(), &field, Some(updated_by.as_str())),
            },
        ))
        .await
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
        &self,
        connector_transaction_id: &ConnectorTransactionId,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
                        connector_transaction_id,
                        payment_id,
                        merchant_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                // We assume that PaymentAttempt <=> PaymentIntent is a one-to-one relation for now
                let lookup_id = format!(
                    "pa_conn_trans_{}_{}",
                    merchant_id.get_string_repr(),
                    connector_transaction_id.get_id()
                );
                let lookup = fallback_reverse_lookup_not_found!(
                    self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
                        .await,
                    self.router_store
                        .find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(
                            connector_transaction_id,
                            payment_id,
                            merchant_id,
                            storage_scheme,
                            merchant_key_store,
                        )
                        .await
                );

                let key = PartitionKey::CombinationKey {
                    combination: &lookup.pk_id,
                };
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;

                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        let diesel_payment_attempt = Box::pin(kv_wrapper(self, KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id), key)).await?.try_into_hget()?;
                        PaymentAttempt::convert_back(
                            key_manager_state,
                            diesel_payment_attempt,
                            merchant_key_store.key.get_inner(),
                            merchant_id.clone().into(),
                        )
                        .await
                        .change_context(redis_interface::errors::RedisError::UnknownResult)
                        .attach_printable("Error while constructing domain model")
                    },
                        || async {self.router_store.find_payment_attempt_by_connector_transaction_id_payment_id_merchant_id(connector_transaction_id, payment_id, merchant_id, storage_scheme, merchant_key_store).await},
                    ))
                    .await
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let database_call = || {
            self.router_store
                .find_payment_attempt_last_successful_attempt_by_payment_id_merchant_id(
                    payment_id,
                    merchant_id,
                    storage_scheme,
                    merchant_key_store,
                )
        };
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => database_call().await,
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::MerchantIdPaymentId {
                    merchant_id,
                    payment_id,
                };
                let pattern = "pa_*";
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;

                let redis_fut = async {
                    let kv_result = Box::pin(kv_wrapper::<DieselPaymentAttempt, _, _>(
                        self,
                        KvOperation::<DieselPaymentAttempt>::Scan(pattern),
                        key,
                    ))
                    .await?
                    .try_into_scan();
                    let diesel_payment_attempt = kv_result.and_then(|mut payment_attempts| {
                        payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at));
                        payment_attempts
                            .iter()
                            .find(|&pa| pa.status == api_models::enums::AttemptStatus::Charged)
                            .cloned()
                            .ok_or(error_stack::report!(
                                redis_interface::errors::RedisError::NotFound
                            ))
                    })?;
                    PaymentAttempt::convert_back(
                        key_manager_state,
                        diesel_payment_attempt,
                        merchant_key_store.key.get_inner(),
                        merchant_id.clone().into(),
                    )
                    .await
                    .change_context(redis_interface::errors::RedisError::UnknownResult)
                    .attach_printable("Error while constructing domain model")
                };
                Box::pin(try_redis_get_else_try_database_get(
                    redis_fut,
                    database_call,
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id_merchant_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let database_call = || {
            self.router_store
                .find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id_merchant_id(
                    payment_id,
                    merchant_id,
                    storage_scheme,
                    merchant_key_store,
                )
        };
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => database_call().await,
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::MerchantIdPaymentId {
                    merchant_id,
                    payment_id,
                };
                let pattern = "pa_*";
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;

                let redis_fut = async {
                    let kv_result = Box::pin(kv_wrapper::<DieselPaymentAttempt, _, _>(
                        self,
                        KvOperation::<DieselPaymentAttempt>::Scan(pattern),
                        key,
                    ))
                    .await?
                    .try_into_scan();
                    let diesel_payment_attempt = kv_result.and_then(|mut payment_attempts| {
                        payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at));
                        payment_attempts
                            .iter()
                            .find(|&pa| {
                                pa.status == api_models::enums::AttemptStatus::Charged
                                    || pa.status == api_models::enums::AttemptStatus::PartialCharged
                            })
                            .cloned()
                            .ok_or(error_stack::report!(
                                redis_interface::errors::RedisError::NotFound
                            ))
                    })?;
                    PaymentAttempt::convert_back(
                        key_manager_state,
                        diesel_payment_attempt,
                        merchant_key_store.key.get_inner(),
                        merchant_id.clone().into(),
                    )
                    .await
                    .change_context(redis_interface::errors::RedisError::UnknownResult)
                    .attach_printable("Error while constructing domain model")
                };
                Box::pin(try_redis_get_else_try_database_get(
                    redis_fut,
                    database_call,
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        payment_id: &common_utils::id_type::GlobalPaymentId,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let database_call = || {
            self.router_store
                .find_payment_attempt_last_successful_or_partially_captured_attempt_by_payment_id(
                    merchant_key_store,
                    payment_id,
                    storage_scheme,
                )
        };

        let decided_storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        let keymanager_state = self
            .get_keymanager_state()
            .attach_printable("Missing KeyManagerState")?;

        match decided_storage_scheme {
            MerchantStorageScheme::PostgresOnly => database_call().await,
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::GlobalPaymentId { id: payment_id };

                let redis_fut = async {
                    let kv_result = kv_wrapper::<DieselPaymentAttempt, _, _>(
                        self,
                        KvOperation::<DieselPaymentAttempt>::Scan("pa_*"),
                        key.clone(),
                    )
                    .await?
                    .try_into_scan();

                    let payment_attempt = kv_result.and_then(|mut payment_attempts| {
                        payment_attempts.sort_by(|a, b| b.modified_at.cmp(&a.modified_at));
                        payment_attempts
                            .iter()
                            .find(|&pa| {
                                pa.status == diesel_models::enums::AttemptStatus::Charged
                                    || pa.status
                                        == diesel_models::enums::AttemptStatus::PartialCharged
                            })
                            .cloned()
                            .ok_or(error_stack::report!(
                                redis_interface::errors::RedisError::NotFound
                            ))
                    })?;
                    let merchant_id = payment_attempt.merchant_id.clone();
                    PaymentAttempt::convert_back(
                        keymanager_state,
                        payment_attempt,
                        merchant_key_store.key.get_inner(),
                        merchant_id.into(),
                    )
                    .await
                    .change_context(redis_interface::errors::RedisError::UnknownResult)
                };

                Box::pin(try_redis_get_else_try_database_get(
                    redis_fut,
                    database_call,
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v2")]
    async fn find_payment_attempt_by_profile_id_connector_transaction_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        profile_id: &common_utils::id_type::ProfileId,
        connector_transaction_id: &str,
        storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        self.find_resource_by_id(
            merchant_key_store,
            storage_scheme,
            DieselPaymentAttempt::find_by_profile_id_connector_transaction_id(
                &conn,
                profile_id,
                connector_transaction_id,
            ),
            FindResourceBy::LookupId(label::get_profile_id_connector_transaction_label(
                profile_id.get_string_repr(),
                connector_transaction_id,
            )),
        )
        .await
    }

    #[instrument(skip_all)]
    #[cfg(feature = "v1")]
    async fn find_payment_attempt_by_merchant_id_connector_txn_id(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        connector_txn_id: &str,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_payment_attempt_by_merchant_id_connector_txn_id(
                        merchant_id,
                        connector_txn_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let lookup_id = format!(
                    "pa_conn_trans_{}_{connector_txn_id}",
                    merchant_id.get_string_repr()
                );
                let lookup = fallback_reverse_lookup_not_found!(
                    self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
                        .await,
                    self.router_store
                        .find_payment_attempt_by_merchant_id_connector_txn_id(
                            merchant_id,
                            connector_txn_id,
                            storage_scheme,
                            merchant_key_store,
                        )
                        .await
                );

                let key = PartitionKey::CombinationKey {
                    combination: &lookup.pk_id,
                };
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;
                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        let diesel_payment_attempt = Box::pin(kv_wrapper(
                            self,
                            KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
                            key,
                        ))
                        .await?
                        .try_into_hget()?;
                        PaymentAttempt::convert_back(
                            key_manager_state,
                            diesel_payment_attempt,
                            merchant_key_store.key.get_inner(),
                            merchant_id.clone().into(),
                        )
                        .await
                        .change_context(redis_interface::errors::RedisError::UnknownResult)
                        .attach_printable("Error while constructing domain model")
                    },
                    || async {
                        self.router_store
                            .find_payment_attempt_by_merchant_id_connector_txn_id(
                                merchant_id,
                                connector_txn_id,
                                storage_scheme,
                                merchant_key_store,
                            )
                            .await
                    },
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_payment_id_merchant_id_attempt_id(
        &self,
        payment_id: &common_utils::id_type::PaymentId,
        merchant_id: &common_utils::id_type::MerchantId,
        attempt_id: &str,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_payment_attempt_by_payment_id_merchant_id_attempt_id(
                        payment_id,
                        merchant_id,
                        attempt_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::MerchantIdPaymentId {
                    merchant_id,
                    payment_id,
                };
                let field = format!("pa_{attempt_id}");
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;
                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        let diesel_payment_attempt = Box::pin(kv_wrapper(
                            self,
                            KvOperation::<DieselPaymentAttempt>::HGet(&field),
                            key,
                        ))
                        .await?
                        .try_into_hget()?;
                        PaymentAttempt::convert_back(
                            key_manager_state,
                            diesel_payment_attempt,
                            merchant_key_store.key.get_inner(),
                            merchant_id.clone().into(),
                        )
                        .await
                        .change_context(redis_interface::errors::RedisError::UnknownResult)
                        .attach_printable("Error while constructing domain model")
                    },
                    || async {
                        self.router_store
                            .find_payment_attempt_by_payment_id_merchant_id_attempt_id(
                                payment_id,
                                merchant_id,
                                attempt_id,
                                storage_scheme,
                                merchant_key_store,
                            )
                            .await
                    },
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_attempt_id_merchant_id(
        &self,
        attempt_id: &str,
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_payment_attempt_by_attempt_id_merchant_id(
                        attempt_id,
                        merchant_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let lookup_id = format!("pa_{}_{attempt_id}", merchant_id.get_string_repr());
                let lookup = fallback_reverse_lookup_not_found!(
                    self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
                        .await,
                    self.router_store
                        .find_payment_attempt_by_attempt_id_merchant_id(
                            attempt_id,
                            merchant_id,
                            storage_scheme,
                            merchant_key_store,
                        )
                        .await
                );

                let key = PartitionKey::CombinationKey {
                    combination: &lookup.pk_id,
                };
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;
                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        let diesel_payment_attempt = Box::pin(kv_wrapper(
                            self,
                            KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
                            key,
                        ))
                        .await?
                        .try_into_hget()?;
                        PaymentAttempt::convert_back(
                            key_manager_state,
                            diesel_payment_attempt,
                            merchant_key_store.key.get_inner(),
                            merchant_id.clone().into(),
                        )
                        .await
                        .change_context(redis_interface::errors::RedisError::UnknownResult)
                        .attach_printable("Error while constructing domain model")
                    },
                    || async {
                        self.router_store
                            .find_payment_attempt_by_attempt_id_merchant_id(
                                attempt_id,
                                merchant_id,
                                storage_scheme,
                                merchant_key_store,
                            )
                            .await
                    },
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_id(
        &self,
        merchant_key_store: &MerchantKeyStore,
        attempt_id: &common_utils::id_type::GlobalAttemptId,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        self.find_resource_by_id(
            merchant_key_store,
            storage_scheme,
            DieselPaymentAttempt::find_by_id(&conn, attempt_id),
            FindResourceBy::LookupId(label::get_global_id_label(attempt_id)),
        )
        .await
    }

    #[cfg(feature = "v2")]
    #[instrument(skip_all)]
    async fn find_payment_attempts_by_payment_intent_id(
        &self,
        payment_id: &common_utils::id_type::GlobalPaymentId,
        merchant_key_store: &MerchantKeyStore,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<Vec<PaymentAttempt>, errors::StorageError> {
        let conn = pg_connection_read(self).await?;
        self.filter_resources(
            merchant_key_store,
            storage_scheme,
            DieselPaymentAttempt::find_by_payment_id(&conn, payment_id),
            |_| true,
            FilterResourceParams {
                key: PartitionKey::GlobalPaymentId { id: payment_id },
                pattern: "pa_*",
                limit: None,
            },
        )
        .await
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_payment_attempt_by_preprocessing_id_merchant_id(
        &self,
        preprocessing_id: &str,
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<PaymentAttempt, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_payment_attempt_by_preprocessing_id_merchant_id(
                        preprocessing_id,
                        merchant_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let lookup_id = format!(
                    "pa_preprocessing_{}_{preprocessing_id}",
                    merchant_id.get_string_repr()
                );
                let lookup = fallback_reverse_lookup_not_found!(
                    self.get_lookup_by_lookup_id(&lookup_id, storage_scheme)
                        .await,
                    self.router_store
                        .find_payment_attempt_by_preprocessing_id_merchant_id(
                            preprocessing_id,
                            merchant_id,
                            storage_scheme,
                            merchant_key_store,
                        )
                        .await
                );
                let key = PartitionKey::CombinationKey {
                    combination: &lookup.pk_id,
                };
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;

                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        let diesel_payment_attempt = Box::pin(kv_wrapper(
                            self,
                            KvOperation::<DieselPaymentAttempt>::HGet(&lookup.sk_id),
                            key,
                        ))
                        .await?
                        .try_into_hget()?;
                        PaymentAttempt::convert_back(
                            key_manager_state,
                            diesel_payment_attempt,
                            merchant_key_store.key.get_inner(),
                            merchant_id.clone().into(),
                        )
                        .await
                        .change_context(redis_interface::errors::RedisError::UnknownResult)
                        .attach_printable("Error while constructing domain model")
                    },
                    || async {
                        self.router_store
                            .find_payment_attempt_by_preprocessing_id_merchant_id(
                                preprocessing_id,
                                merchant_id,
                                storage_scheme,
                                merchant_key_store,
                            )
                            .await
                    },
                ))
                .await
            }
        }
    }

    #[cfg(feature = "v1")]
    #[instrument(skip_all)]
    async fn find_attempts_by_merchant_id_payment_id(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        payment_id: &common_utils::id_type::PaymentId,
        storage_scheme: MerchantStorageScheme,
        merchant_key_store: &MerchantKeyStore,
    ) -> error_stack::Result<Vec<PaymentAttempt>, errors::StorageError> {
        let storage_scheme = Box::pin(decide_storage_scheme::<_, DieselPaymentAttempt>(
            self,
            storage_scheme,
            Op::Find,
        ))
        .await;
        match storage_scheme {
            MerchantStorageScheme::PostgresOnly => {
                self.router_store
                    .find_attempts_by_merchant_id_payment_id(
                        merchant_id,
                        payment_id,
                        storage_scheme,
                        merchant_key_store,
                    )
                    .await
            }
            MerchantStorageScheme::RedisKv => {
                let key = PartitionKey::MerchantIdPaymentId {
                    merchant_id,
                    payment_id,
                };
                let key_manager_state = self
                    .get_keymanager_state()
                    .attach_printable("Missing KeyManagerState")?;
                Box::pin(try_redis_get_else_try_database_get(
                    async {
                        Box::pin(kv_wrapper(
                            self,
                            KvOperation::<DieselPaymentAttempt>::Scan("pa_*"),
                            key,
                        ))
                        .await?
                        .try_into_scan()
                        .map(|v| {
                            try_join_all(v.into_iter().map(|diesel_payment_attempt| {
                                PaymentAttempt::convert_back(
                                    key_manager_state,
                                    diesel_payment_attempt,
                                    merchant_key_store.key.get_inner(),
                                    merchant_key_store.merchant_id.clone().into(),
                                )
                            }))
                            .map(|join_result| {
                                join_result
                                    .change_context(errors::RedisError::UnknownResult)
                                    .attach_printable("Error while constructing domain model")
                            })
                        })?
                        .await
                    },
                    || async {
                        self.router_store
                            .find_attempts_by_merchant_id_payment_id(
                                merchant_id,
                                payment_id,
                                storage_scheme,
                                merchant_key_store,
                            )
                            .await
                    },
                ))
                .await
            }
        }
    }

    #[cfg(all(feature = "v1", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_filters_for_payments(
        &self,
        pi: &[PaymentIntent],
        merchant_id: &common_utils::id_type::MerchantId,
        storage_scheme: MerchantStorageScheme,
    ) -> error_stack::Result<PaymentListFilters, errors::StorageError> {
        self.router_store
            .get_filters_for_payments(pi, merchant_id, storage_scheme)
            .await
    }

    #[cfg(all(feature = "v1", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_total_count_of_filtered_payment_attempts(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        active_attempt_ids: &[String],
        connector: Option<Vec<api_models::enums::Connector>>,
        payment_method: Option<Vec<common_enums::PaymentMethod>>,
        payment_method_type: Option<Vec<common_enums::PaymentMethodType>>,
        authentication_type: Option<Vec<common_enums::AuthenticationType>>,
        merchant_connector_id: Option<Vec<common_utils::id_type::MerchantConnectorAccountId>>,
        card_network: Option<Vec<common_enums::CardNetwork>>,
        card_discovery: Option<Vec<common_enums::CardDiscovery>>,
        storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<i64, errors::StorageError> {
        self.router_store
            .get_total_count_of_filtered_payment_attempts(
                merchant_id,
                active_attempt_ids,
                connector,
                payment_method,
                payment_method_type,
                authentication_type,
                merchant_connector_id,
                card_network,
                card_discovery,
                storage_scheme,
            )
            .await
    }
    #[cfg(all(feature = "v2", feature = "olap"))]
    #[instrument(skip_all)]
    async fn get_total_count_of_filtered_payment_attempts(
        &self,
        merchant_id: &common_utils::id_type::MerchantId,
        active_attempt_ids: &[String],
        connector: Option<Vec<api_models::enums::Connector>>,
        payment_method_type: Option<Vec<common_enums::PaymentMethod>>,
        payment_method_subtype: Option<Vec<common_enums::PaymentMethodType>>,
        authentication_type: Option<Vec<common_enums::AuthenticationType>>,
        merchant_connector_id: Option<Vec<common_utils::id_type::MerchantConnectorAccountId>>,
        card_network: Option<Vec<common_enums::CardNetwork>>,
        storage_scheme: MerchantStorageScheme,
    ) -> CustomResult<i64, errors::StorageError> {
        self.router_store
            .get_total_count_of_filtered_payment_attempts(
                merchant_id,
                active_attempt_ids,
                connector,
                payment_method_type,
                payment_method_subtype,
                authentication_type,
                merchant_connector_id,
                card_network,
                storage_scheme,
            )
            .await
    }
}

impl DataModelExt for MandateAmountData {
    type StorageModel = DieselMandateAmountData;

    fn to_storage_model(self) -> Self::StorageModel {
        DieselMandateAmountData {
            amount: self.amount,
            currency: self.currency,
            start_date: self.start_date,
            end_date: self.end_date,
            metadata: self.metadata,
        }
    }

    fn from_storage_model(storage_model: Self::StorageModel) -> Self {
        Self {
            amount: storage_model.amount,
            currency: storage_model.currency,
            start_date: storage_model.start_date,
            end_date: storage_model.end_date,
            metadata: storage_model.metadata,
        }
    }
}
impl DataModelExt for MandateDetails {
    type StorageModel = DieselMandateDetails;
    fn to_storage_model(self) -> Self::StorageModel {
        DieselMandateDetails {
            update_mandate_id: self.update_mandate_id,
        }
    }
    fn from_storage_model(storage_model: Self::StorageModel) -> Self {
        Self {
            update_mandate_id: storage_model.update_mandate_id,
        }
    }
}

impl DataModelExt for MandateDataType {
    type StorageModel = DieselMandateType;

    fn to_storage_model(self) -> Self::StorageModel {
        match self {
            Self::SingleUse(data) => DieselMandateType::SingleUse(data.to_storage_model()),
            Self::MultiUse(None) => DieselMandateType::MultiUse(None),
            Self::MultiUse(Some(data)) => {
                DieselMandateType::MultiUse(Some(data.to_storage_model()))
            }
        }
    }

    fn from_storage_model(storage_model: Self::StorageModel) -> Self {
        match storage_model {
            DieselMandateType::SingleUse(data) => {
                Self::SingleUse(MandateAmountData::from_storage_model(data))
            }
            DieselMandateType::MultiUse(Some(data)) => {
                Self::MultiUse(Some(MandateAmountData::from_storage_model(data)))
            }
            DieselMandateType::MultiUse(None) => Self::MultiUse(None),
        }
    }
}

#[inline]
#[instrument(skip_all)]
async fn add_connector_txn_id_to_reverse_lookup<T: DatabaseStore>(
    store: &KVRouterStore<T>,
    key: &str,
    merchant_id: &common_utils::id_type::MerchantId,
    updated_attempt_attempt_id: &str,
    connector_transaction_id: &str,
    storage_scheme: MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
    let field = format!("pa_{updated_attempt_attempt_id}");
    let reverse_lookup_new = ReverseLookupNew {
        lookup_id: format!(
            "pa_conn_trans_{}_{}",
            merchant_id.get_string_repr(),
            connector_transaction_id
        ),
        pk_id: key.to_owned(),
        sk_id: field.clone(),
        source: "payment_attempt".to_string(),
        updated_by: storage_scheme.to_string(),
    };
    store
        .insert_reverse_lookup(reverse_lookup_new, storage_scheme)
        .await
}

#[inline]
#[instrument(skip_all)]
async fn add_preprocessing_id_to_reverse_lookup<T: DatabaseStore>(
    store: &KVRouterStore<T>,
    key: &str,
    merchant_id: &common_utils::id_type::MerchantId,
    updated_attempt_attempt_id: &str,
    preprocessing_id: &str,
    storage_scheme: MerchantStorageScheme,
) -> CustomResult<ReverseLookup, errors::StorageError> {
    let field = format!("pa_{updated_attempt_attempt_id}");
    let reverse_lookup_new = ReverseLookupNew {
        lookup_id: format!(
            "pa_preprocessing_{}_{}",
            merchant_id.get_string_repr(),
            preprocessing_id
        ),
        pk_id: key.to_owned(),
        sk_id: field.clone(),
        source: "payment_attempt".to_string(),
        updated_by: storage_scheme.to_string(),
    };
    store
        .insert_reverse_lookup(reverse_lookup_new, storage_scheme)
        .await
}

#[cfg(feature = "v2")]
mod label {
    pub(super) const MODEL_NAME: &str = "payment_attempt_v2";
    pub(super) const CLUSTER_LABEL: &str = "pa";

    pub(super) fn get_profile_id_connector_transaction_label(
        profile_id: &str,
        connector_transaction_id: &str,
    ) -> String {
        format!("profile_{profile_id}_conn_txn_{connector_transaction_id}")
    }

    pub(super) fn get_global_id_label(
        attempt_id: &common_utils::id_type::GlobalAttemptId,
    ) -> String {
        format!("attempt_global_id_{}", attempt_id.get_string_repr())
    }
}
